package com.iotdb.zjc.demo.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
 * @Author: zjc
 * @ClassName KafkaProducer
 * @Description TODO kafka生产者
 * @date 2021/11/19 11:17
 * @Version 1.0
 */
@Component
public class KafkaProducer {
    private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    public void sendMesage(String topicName,String message){
        log.info("send message to kafka {}{}",topicName,message);
        //如果在发送消息时需要创建事务，可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务
        try{
            // 声明事务：后面报错消息不会发出去
//            kafkaTemplate.executeInTransaction(operations -> {
                kafkaTemplate.send(topicName, message).addCallback(success -> {
                    // 消息发送到的topic
                    String topic = success.getRecordMetadata().topic();
                    // 消息发送到的分区
                    int partition = success.getRecordMetadata().partition();
                    // 消息在分区内的offset
                    long offset = success.getRecordMetadata().offset();
                    System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
                }, failure -> {
                    System.out.println("发送消息失败:" + failure.getMessage());
                });
//                throw new RuntimeException("fail");
//            });
        }catch (Exception e){
            log.error("send message error"+e);
        }
    }

}
